package com.marion.codestandard.utils;

import lombok.Data;
import org.springframework.util.Assert;

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 手写线程池
 * @author Marion
 * @date 2022/5/30 15:58
 */
@Data
public class MThreadPoolExecutor {

    private int coreThreadSize;
    private int maxThreadSize;
    private AtomicInteger workerCount;
    private ReentrantLock lock;
    private BlockingQueue<Runnable> taskQueue;
    private Set<Worker> workers;
    private RejectedHandler rejectedHandler;
    private AtomicBoolean isShutDown;
    private long timeout;

    public void execute(Runnable runnable) {
        /**
         * 1. 判断工作线程数是否小于核心线程数
         * 2. 新增最大线程数失败执行拒绝策略
         */
        Assert.notNull(runnable, "task can not be null");
        int workerCount = this.workerCount.get();
        if (workerCount < coreThreadSize) {
            addWorker(runnable, true);
            return;
        }
        if (this.taskQueue.offer(runnable)) {
            System.out.println("task in queue success");
        } else if (addWorker(runnable, false)) {
            rejectedHandler.rejectedHandler(runnable, this);
        }

    }

    private Boolean addWorker(Runnable task, boolean core) {
        /**
         * 1. 双for判断运行状态
         * 2. 加类锁，判断当前线程是否超过核心/最大线程，工作线程+1
         * 3. 创建一个Worker，检查是否处理停止状态，添加到Worker队列
         * 4. 添加成功启动线程
         * 5. 有异常添加失败执行`addWorkerFailed()`
         */
        // 1. 双for判断运行状态
        retry:
        for (; ; ) {
            boolean status = this.isShutDown.get();
            if (status) {
                System.out.println("[addWorker] thread already shut down");
                return false;
            }

            for (; ; ) {
                try {
                    // 2. 加类锁，判断当前线程是否超过核心/最大线程，工作线程+1
                    lock.lock();
                    int wc = this.workerCount.get();
                    if (wc < (core ? coreThreadSize : maxThreadSize)) {
                        System.out.println("[addWorker] workerCount exceed");
                        return false;
                    }

                    if (workerCount.compareAndSet(wc, wc + 1)) {
                        break retry;
                    }

                    if (isShutDown.get() != status) {
                        continue retry;
                    }
                } catch (Exception e) {
                    System.out.println(e);
                } finally {
                    lock.unlock();
                }
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker worker = null;
        try {
            // 3. 创建一个Worker，检查是否处理停止状态，添加到Worker队列
            worker = new Worker(task);
            if (worker.thread != null) {
                try {
                    lock.lock();

                    if (!isShutDown.get()) {
                        if (worker.thread.isAlive()) {
                            throw new IllegalStateException("thread already started.");
                        }

                        workers.add(worker);
                        workerAdded = true;
                    }

                } catch (Exception e) {
                    System.out.println(e);
                } finally {
                    lock.unlock();
                }

                // 4. 添加成功启动线程
                if (workerAdded) {
                    System.out.println("[addWorker] worker start");
                    worker.thread.start();
                    workerStarted = true;
                }
            }

        } finally {
            if (!workerAdded) {
                System.out.println("addWorkerFailed");
                addWorkerFailed(worker);
            }
        }

        return workerStarted;
    }

    /**
     * 添加工作队列失败，较少工作线程数
     * @param worker
     */
    private void addWorkerFailed(Worker worker) {
        try {
            lock.lock();
            if (worker != null) {
                this.workers.remove(worker);
                doDecrementWorkerCount();
            }
        } finally {
            lock.unlock();
        }
    }

    private void doDecrementWorkerCount() {
        do {

        } while (!workerCount.compareAndSet(workerCount.get(), workerCount.get() - 1));
    }

    private void doIncrementWorkerCount() {
        do {

        } while (!workerCount.compareAndSet(workerCount.get(), workerCount.get() + 1));
    }

    public void runWorker(Worker worker) {
        Thread thread = Thread.currentThread();
        Runnable firstTask = worker.firstTask;
        worker.firstTask = null;
        boolean completedAbruptly = true;
        try {
            while (firstTask != null || (firstTask = getTask()) != null) {
                worker.lock();
                boolean status = isShutDown.get();
                if (status && !thread.isInterrupted()) {
                    thread.interrupt();
                }

                try {
                    firstTask.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                } finally {
                    worker.unlock();
                }
            }

        } finally {
            processWorkerExit(worker, completedAbruptly);
        }

        worker.run();
    }

    private void processWorkerExit(Worker worker, boolean completedAbruptly) {
        if (completedAbruptly) {
            doDecrementWorkerCount();
        }

        lock.lock();
        try {
            workers.remove(worker);
        } finally {
            lock.unlock();
        }

        boolean status = isShutDown.get();
        if (!status) {
            if (!completedAbruptly) {

            }

            addWorker(null, false);
        }

    }

    private Runnable getTask() {
        boolean isTimeout = false;
        for (; ; ) {
            boolean status = isShutDown.get();
            if (status) {
                System.out.println("[getTask] thread has been shut down");
                return null;
            }

            try {
                boolean timed = workerCount.get() > coreThreadSize;

                if (workerCount.get() > maxThreadSize) {
                    if (this.workerCount.compareAndSet(workerCount.get(), workerCount.get() - 1)) {
                        return null;
                    }
                    continue;
                }

                Runnable r = timed ? taskQueue.poll(timeout, TimeUnit.SECONDS) : taskQueue.take();
                if (r != null) {
                    return r;
                }

                isTimeout = true;
            } catch (InterruptedException e) {

            }
        }
    }

    @Data
    private class Worker extends ReentrantLock implements Runnable {

        private Runnable firstTask;
        private Thread thread;

        /**
         * Creates an instance of {@code ReentrantLock}.
         * This is equivalent to using {@code ReentrantLock(false)}.
         */
        public Worker(Runnable firstTask) {
            this.firstTask = firstTask;
            this.thread = new Thread(this, "marion_" + UUID.randomUUID().toString());
        }

        @Override
        public void run() {
            runWorker(this);
        }

    }

    @Data
    private class RejectedHandler implements MRejectedHandler {
        @Override
        public void rejectedHandler(Runnable runnable, MThreadPoolExecutor mThreadPoolExecutor) {
            System.out.println("rejected handler start.");
        }
    }

}
